代码生成与补全概述#
Claude Code 提供了强大的代码生成与补全功能,可以根据自然语言描述、代码片段、API 规范等生成高质量的代码,并智能补全未完成的代码。
代码生成#
1. 从自然语言生成代码#
示例:从自然语言生成代码
用户请求: "创建一个用户认证系统,支持注册、登录、密码重置功能"
Claude Code 生成的代码:
python````python from flask import Flask, request, jsonify from flask_sqlalchemy import SQLAlchemy from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity from flask_mail import Mail, Message from datetime import datetime, timedelta import bcrypt import secrets app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///auth.db' app.config['JWT_SECRET_KEY'] = 'your-secret-key' app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=24) app.config['MAIL_SERVER'] = 'smtp.gmail.com' app.config['MAIL_PORT'] = 587 app.config['MAIL_USE_TLS'] = True app.config['MAIL_USERNAME'] = 'your-email@gmail.com' app.config['MAIL_PASSWORD'] = 'your-password' db = SQLAlchemy(app) jwt = JWTManager(app) mail = Mail(app) class User(db.Model): """用户模型""" id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) email = db.Column(db.String(120), unique=True, nullable=False) password_hash = db.Column(db.String(128), nullable=False) is_active = db.Column(db.Boolean, default=True) created_at = db.Column(db.DateTime, default=datetime.utcnow) updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) reset_token = db.Column(db.String(100), unique=True) reset_token_expires = db.Column(db.DateTime) class PasswordReset(db.Model): """密码重置记录""" id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False) token = db.Column(db.String(100), unique=True, nullable=False) expires_at = db.Column(db.DateTime, nullable=False) used = db.Column(db.Boolean, default=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) @app.route('/register', methods=['POST']) def register(): """用户注册""" data = request.get_json() # 验证输入 if not data.get('username') or not data.get('email') or not data.get('password'): return jsonify({'error': 'Missing required fields'}), 400 # 检查用户名是否已存在 if User.query.filter_by(username=data['username']).first(): return jsonify({'error': 'Username already exists'}), 400 # 检查邮箱是否已存在 if User.query.filter_by(email=data['email']).first(): return jsonify({'error': 'Email already exists'}), 400 # 验证密码强度 if len(data['password']) < 8: return jsonify({'error': 'Password must be at least 8 characters'}), 400 # 创建用户 user = User( username=data['username'], email=data['email'], password_hash=bcrypt.hashpw( data['password'].encode('utf-8'), bcrypt.gensalt() ).decode('utf-8') ) db.session.add(user) db.session.commit() # 生成访问令牌 access_token = create_access_token(identity=user.id) return jsonify({ 'message': 'User registered successfully', 'user': { 'id': user.id, 'username': user.username, 'email': user.email }, 'access_token': access_token }), 201 @app.route('/login', methods=['POST']) def login(): """用户登录""" data = request.get_json() # 验证输入 if not data.get('username') or not data.get('password'): return jsonify({'error': 'Missing username or password'}), 400 # 查找用户 user = User.query.filter_by(username=data['username']).first() if not user or not bcrypt.checkpw( data['password'].encode('utf-8'), user.password_hash.encode('utf-8') ): return jsonify({'error': 'Invalid username or password'}), 401 if not user.is_active: return jsonify({'error': 'Account is deactivated'}), 403 # 生成访问令牌 access_token = create_access_token(identity=user.id) return jsonify({ 'message': 'Login successful', 'user': { 'id': user.id, 'username': user.username, 'email': user.email }, 'access_token': access_token }) @app.route('/me', methods=['GET']) @jwt_required() def get_current_user(): """获取当前用户信息""" user_id = get_jwt_identity() user = User.query.get(user_id) if not user: return jsonify({'error': 'User not found'}), 404 return jsonify({ 'id': user.id, 'username': user.username, 'email': user.email, 'is_active': user.is_active, 'created_at': user.created_at.isoformat() }) @app.route('/request-password-reset', methods=['POST']) def request_password_reset(): """请求密码重置""" data = request.get_json() if not data.get('email'): return jsonify({'error': 'Email is required'}), 400 user = User.query.filter_by(email=data['email']).first() if not user: # 为了安全,不透露邮箱是否存在 return jsonify({ 'message': 'If the email exists, a reset link has been sent' }), 200 # 生成重置令牌 reset_token = secrets.token_urlsafe(32) reset_token_expires = datetime.utcnow() + timedelta(hours=1) # 保存重置令牌 password_reset = PasswordReset( user_id=user.id, token=reset_token, expires_at=reset_token_expires ) db.session.add(password_reset) db.session.commit() # 发送重置邮件 reset_link = f"http://localhost:3000/reset-password?token={reset_token}" msg = Message(
'Password Reset Request', recipients=[user.email] ) msg.body = f''' Hello {user.username},
You have requested to reset your password. Click the link below to reset your password:
{reset_link}
This link will expire in 1 hour.
If you did not request this password reset, please ignore this email. '''
mail.send(msg)
return jsonify({ 'message': 'If the email exists, a reset link has been sent' }), 200
@app.route('/reset-password', methods=['POST']) def reset_password(): """重置密码""" data = request.get_json()
if not data.get('token') or not data.get('password'): return jsonify({'error': 'Token and password are required'}), 400
查找重置记录
password_reset = PasswordReset.query.filter_by( token=data['token'], used=False ).first()
if not password_reset: return jsonify({'error': 'Invalid or expired reset token'}), 400
if password_reset.expires_at < datetime.utcnow(): return jsonify({'error': 'Reset token has expired'}), 400
验证密码强度
if len(data['password']) < 8: return jsonify({'error': 'Password must be at least 8 characters'}), 400
更新密码
user = User.query.get(password_reset.user_id) user.password_hash = bcrypt.hashpw( data['password'].encode('utf-8'), bcrypt.gensalt() ).decode('utf-8')
标记重置令牌为已使用
password_reset.used = True
db.session.commit()
return jsonify({'message': 'Password reset successful'}), 200
@app.route('/change-password', methods=['POST']) @jwt_required() def change_password(): """修改密码""" data = request.get_json() user_id = get_jwt_identity()
if not data.get('current_password') or not data.get('new_password'): return jsonify({'error': 'Current password and new password are required'}), 400
user = User.query.get(user_id)
验证当前密码
if not bcrypt.checkpw( data['current_password'].encode('utf-8'), user.password_hash.encode('utf-8') ): return jsonify({'error': 'Current password is incorrect'}), 400
验证新密码强度
if len(data['new_password']) < 8: return jsonify({'error': 'New password must be at least 8 characters'}), 400
更新密码
user.password_hash = bcrypt.hashpw( data['new_password'].encode('utf-8'), bcrypt.gensalt() ).decode('utf-8')
db.session.commit()
return jsonify({'message': 'Password changed successfully'}), 200
if name == 'main': with app.app_context(): db.create_all() app.run(debug=True)
bash> - 用户注册功能 > - 用户登录功能 > - JWT 认证 > - 密码重置请求 > - 密码重置 > - 修改密码 > - 邮件发送 > - 数据验证 > - 错误处理
2. 从 API 规范生成代码#
示例:从 OpenAPI 规范生成代码
用户请求: "根据这个 OpenAPI 规范生成 API 代码" OpenAPI 规范:
yaml`yaml openapi: 3.0.0 info: title: Product API version: 1.0.0 paths: /products: get: summary: Get all products parameters: - name: page in: query schema: type: integer - name: per_page in: query schema: type: integer responses: '200': description: Success content: application/json: schema: type: object properties: products: type: array items: $ref: '#/components/schemas/Product' total: type: integer page: type: integer post: summary: Create a product requestBody: required: true content: application/json: schema: $ref: '#/components/schemas/ProductCreate' responses: '201': description: Created content: application/json: schema: $ref: '#/components/schemas/Product' /products/{product_id}: get: summary: Get a product parameters: - name: product_id in: path required: true schema: type: integer responses: '200': description: Success content: application/json: schema: $ref: '#/components/schemas/Product' put: summary: Update a product parameters: - name: product_id in: path required: true schema: type: integer requestBody: required: true content: application/json: schema: $ref: '#/components/schemas/ProductUpdate' responses: '200': description: Success content: application/json: schema: $ref: '#/components/schemas/Product' delete: summary: Delete a product parameters: - name: product_id in: path required: true schema: type: integer responses: '204': description: No content components: schemas: Product: type: object properties: id: type: integer name: type: string description: type: string price: type: number stock: type: integer category: type: string created_at: type: string format: date-time updated_at: type: string format: date-time ProductCreate: type: object required: - name - price - stock - category properties: name: type: string description: type: string price: type: number stock: type: integer category: type: string ProductUpdate: type: object properties: name: type: string description: type: string price: type: number stock: type: integer category: type: string ```> Claude Code 生成的代码: ``` python from flask import Flask, request, jsonify from flask_sqlalchemy import SQLAlchemy from datetime import datetime from typing import Optional app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///products.db' db = SQLAlchemy(app) class Product(db.Model): """商品模型""" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(200), nullable=False) description = db.Column(db.Text) price = db.Column(db.Float, nullable=False) stock = db.Column(db.Integer, nullable=False) category = db.Column(db.String(100), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def to_dict(self): """转换为字典""" return { 'id': self.id, 'name': self.name, 'description': self.description, 'price': self.price, 'stock': self.stock, 'category': self.category, 'created_at': self.created_at.isoformat(), 'updated_at': self.updated_at.isoformat() } @app.route('/products', methods=['GET']) def get_products(): """获取所有商品""" page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 20, type=int) pagination = Product.query.paginate(page=page, per_page=per_page) return jsonify({ 'products': [product.to_dict() for product in pagination.items], 'total': pagination.total, 'page': page }) @app.route('/products', methods=['POST']) def create_product(): """创建商品""" data = request.get_json() # 验证必填字段 required_fields = ['name', 'price', 'stock', 'category'] for field in required_fields: if field not in data: return jsonify({'error': f'Missing required field: {field}'}), 400 # 验证数据类型 if not isinstance(data['price'], (int, float)) or data['price'] < 0: return jsonify({'error': 'Price must be a positive number'}), 400 if not isinstance(data['stock'], int) or data['stock'] < 0: return jsonify({'error': 'Stock must be a non-negative integer'}), 400 # 创建商品 product = Product( name=data['name'], description=data.get('description'), price=data['price'], stock=data['stock'], category=data['category'] ) db.session.add(product) db.session.commit() return jsonify(product.to_dict()), 201 @app.route('/products/<int:product_id>', methods=['GET']) def get_product(product_id): """获取商品详情""" product = Product.query.get_or_404(product_id) return jsonify(product.to_dict()) @app.route('/products/<int:product_id>', methods=['PUT']) def update_product(product_id): """更新商品""" product = Product.query.get_or_404(product_id) data = request.get_json() # 更新字段 if 'name' in data: product.name = data['name'] if 'description' in data: product.description = data['description'] if 'price' in data: if not isinstance(data['price'], (int, float)) or data['price'] < 0: return jsonify({'error': 'Price must be a positive number'}), 400 product.price = data['price'] if 'stock' in data: if not isinstance(data['stock'], int) or data['stock'] < 0: return jsonify({'error': 'Stock must be a non-negative integer'}), 400 product.stock = data['stock'] if 'category' in data: product.category = data['category'] product.updated_at = datetime.utcnow() db.session.commit() return jsonify(product.to_dict()) @app.route('/products/<int:product_id>', methods=['DELETE']) def delete_product(product_id): """删除商品""" product = Product.query.get_or_404(product_id) db.session.delete(product) db.session.commit() return '', 204 if __name__ == '__main__': with app.app_context(): db.create_all() app.run(debug=True) > **生成的代码包含**: > - 完整的 CRUD 操作 > - 数据验证 > - 分页功能 > - 错误处理 > - 符合 OpenAPI 规范 ## 代码补全 ### 1. 智能方法补全 ``` python # 示例:智能方法补全 用户输入: ```python ```python class OrderService: def __init__(self, db): self.db = db def create_order(self, user_id, items): """创建订单""" pass Claude Code 自动补全: ````python ````python class OrderService: def __init__(self, db): self.db = db def create_order(self, user_id, items): """ 创建订单 参数: user_id: 用户 ID items: 商品列表 [{'product_id': 1, 'quantity': 2}, ...] 返回: 订单对象 """ # 验证用户 user = self.db.session.query(User).filter_by(id=user_id).first() if not user: raise UserNotFoundError(f"User with id {user_id} not found") # 验证商品 product_ids = [item['product_id'] for item in items] products = self.db.session.query(Product).filter( Product.id.in_(product_ids) ).all() product_map = {product.id: product for product in products} for item in items: if item['product_id'] not in product_map: raise ProductNotFoundError( f"Product with id {item['product_id']} not found" ) product = product_map[item['product_id']] if product.stock < item['quantity']: raise InsufficientStockError( f"Insufficient stock for product {product.name}" ) # 计算总金额 total_amount = sum( product_map[item['product_id']].price * item['quantity'] for item in items ) # 创建订单 order = Order( user_id=user_id, total_amount=total_amount, status='pending' ) self.db.session.add(order) self.db.session.flush() # 创建订单项 for item in items: product = product_map[item['product_id']] order_item = OrderItem( order_id=order.id, product_id=item['product_id'], quantity=item['quantity'], price=product.price ) self.db.session.add(order_item) # 扣减库存 product.stock -= item['quantity'] self.db.session.commit() return order def get_order(self, order_id): """获取订单详情""" order = self.db.session.query(Order).filter_by(id=order_id).first() if not order: raise OrderNotFoundError(f"Order with id {order_id} not found") return order def update_order_status(self, order_id, status): """更新订单状态""" order = self.get_order(order_id) order.status = status order.updated_at = datetime.utcnow() self.db.session.commit() return order def cancel_order(self, order_id): """取消订单""" order = self.get_order(order_id) if order.status in ['paid', 'shipped', 'completed']: raise InvalidOrderStatusError( f"Cannot cancel order with status {order.status}" ) # 恢复库存 for item in order.items: product = self.db.session.query(Product).filter_by( id=item.product_id ).first() if product: product.stock += item.quantity order.status = 'cancelled' order.updated_at = datetime.utcnow() self.db.session.commit() return order def get_user_orders(self, user_id, page=1, per_page=20): """获取用户订单列表""" pagination = self.db.session.query(Order).filter_by( user_id=user_id ).order_by( Order.created_at.desc() ).paginate(page=page, per_page=per_page) return pagination ```> **补全的内容**: > - 完整的方法实现 > - 数据验证 > - 错误处理 > - 库存管理 > - 相关方法补全 ``` ### 2. 智能类补全 # 示例:智能类补全 用户输入: ````python `python class UserRepository: def __init__(self, db): self.db = db ```> Claude Code 自动补全: ``` python from typing import List, Optional from datetime import datetime class UserRepository: """用户仓储类""" def __init__(self, db): self.db = db def find_by_id(self, user_id: int) -> Optional[User]: """根据 ID 查找用户""" return self.db.session.query(User).filter_by(id=user_id).first() def find_by_username(self, username: str) -> Optional[User]: """根据用户名查找用户""" return self.db.session.query(User).filter_by(username=username).first() def find_by_email(self, email: str) -> Optional[User]: """根据邮箱查找用户""" return self.db.session.query(User).filter_by(email=email).first() def find_all(self, page: int = 1, per_page: int = 20): """查找所有用户""" return self.db.session.query(User).paginate( page=page, per_page=per_page ) def create(self, user: User) -> User: """创建用户""" self.db.session.add(user) self.db.session.commit() return user def update(self, user: User) -> User: """更新用户""" user.updated_at = datetime.utcnow() self.db.session.commit() return user def delete(self, user_id: int) -> bool: """删除用户""" user = self.find_by_id(user_id) if not user: return False self.db.session.delete(user) self.db.session.commit() return True def exists_by_username(self, username: str) -> bool: """检查用户名是否存在""" return self.db.session.query(User).filter_by( username=username ).first() is not None def exists_by_email(self, email: str) -> bool: """检查邮箱是否存在""" return self.db.session.query(User).filter_by( email=email ).first() is not None def count(self) -> int: """统计用户数量""" return self.db.session.query(User).count() def search(self, keyword: str, page: int = 1, per_page: int = 20): """搜索用户""" return self.db.session.query(User).filter( User.username.ilike(f'%{keyword}%') | User.email.ilike(f'%{keyword}%') ).paginate(page=page, per_page=per_page) > **补全的内容**: > - 完整的 CRUD 方法 > - 查询方法 > - 存在性检查 > - 搜索功能 > - 分页支持